1   /**
2    * Copyright 2014 Netflix, Inc.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package rx.internal.operators;
17  
18  import static rx.Observable.concat;
19  import static rx.Observable.just;
20  import static rx.Observable.zip;
21  import rx.Observable;
22  import rx.functions.Func1;
23  import rx.functions.Func2;
24  import rx.internal.util.UtilityFunctions;
25  
26  /**
27   * Returns an {@link Observable} that emits a single {@code Boolean} value that indicates whether two source
28   * {@code Observable}s emit sequences of items that are equivalent to each other.
29   */
30  public final class OperatorSequenceEqual {
31      private OperatorSequenceEqual() {
32          throw new IllegalStateException("No instances!");
33      }
34  
35      /** NotificationLite doesn't work as zip uses it. */
36      private static final Object LOCAL_ONCOMPLETED = new Object();
37      static <T> Observable<Object> materializeLite(Observable<T> source) {
38          return concat(
39                  source.map(new Func1<T, Object>() {
40  
41                      @Override
42                      public Object call(T t1) {
43                          return t1;
44                      }
45  
46                  }), just(LOCAL_ONCOMPLETED));
47      }
48  
49      /**
50       * Tests whether two {@code Observable} sequences are identical, emitting {@code true} if both sequences
51       * complete without differing, and {@code false} if the two sequences diverge at any point.
52       *
53       * @param first
54       *      the first of the two {@code Observable}s to compare
55       * @param second
56       *      the second of the two {@code Observable}s to compare
57       * @param equality
58       *      a function that tests emissions from each {@code Observable} for equality
59       * @return an {@code Observable} that emits {@code true} if {@code first} and {@code second} complete
60       *         after emitting equal sequences of items, {@code false} if at any point in their sequences the
61       *         two {@code Observable}s emit a non-equal item.
62       */
63      public static <T> Observable<Boolean> sequenceEqual(
64              Observable<? extends T> first, Observable<? extends T> second,
65              final Func2<? super T, ? super T, Boolean> equality) {
66          Observable<Object> firstObservable = materializeLite(first);
67          Observable<Object> secondObservable = materializeLite(second);
68  
69          return zip(firstObservable, secondObservable,
70                  new Func2<Object, Object, Boolean>() {
71  
72                      @Override
73                      @SuppressWarnings("unchecked")
74                      public Boolean call(Object t1, Object t2) {
75                          boolean c1 = t1 == LOCAL_ONCOMPLETED;
76                          boolean c2 = t2 == LOCAL_ONCOMPLETED;
77                          if (c1 && c2) {
78                              return true;
79                          }
80                          if (c1 || c2) {
81                              return false;
82                          }
83                          // Now t1 and t2 must be 'onNext'.
84                          return equality.call((T)t1, (T)t2);
85                      }
86  
87                  }).all(UtilityFunctions.<Boolean> identity());
88      }
89  }